package com.lianda.log;

import com.lianda.alert.model.LogEvent;
import com.lianda.connectors.utils.ExecutionEnvUtil;
import com.lianda.connectors.utils.GsonUtil;
import com.lianda.log.function.OriLog2LogEventFlatMapFunction;
import com.lianda.log.schema.OriginalLogEventSchema;
import com.lianda.utils.ESSinkUtil;
import com.lianda.utils.RestClientFactoryImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.table.descriptors.Elasticsearch;
import org.apache.http.HttpHost;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.xcontent.XContentType;

import java.net.MalformedURLException;
import java.util.List;
import java.util.Properties;

@Slf4j
public class LogSink2ES {
    public static void main(String[] args) throws Exception {
        //1.配置环境
        final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
        StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);

        //2.从Kafka接受日志数据
        Properties properties = new Properties();

        FlinkKafkaConsumer011 kafkaConsumer = new FlinkKafkaConsumer011(
                "", new OriginalLogEventSchema(), properties);
        SingleOutputStreamOperator<LogEvent> logDataStream = env.addSource(kafkaConsumer)
                .flatMap(new OriLog2LogEventFlatMapFunction());

        //3.数据存入ElasticSearch
        List<HttpHost> esAddresses;
        try {
            esAddresses = ESSinkUtil.getEsAddresses("localhost:9200");
        } catch (MalformedURLException e) {
            log.error("get es address has an error", e);
            return;
        }

        int bulkSize = 40;
        int sinkParallelism = 5;

        //打印收到的数据
//        logDataStream.print();
//        log.info("log:" + logDataStream);

        //存入数据
        ElasticsearchSink.Builder<LogEvent> esSinkBuilder = new ElasticsearchSink.Builder<>(
                esAddresses,
                new ElasticsearchSinkFunction<LogEvent>() {
                    @Override
                    public void process(LogEvent logEvent, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
                        log.info("data===>{}", GsonUtil.toJson(logEvent));
                        requestIndexer.add(Requests.indexRequest()
                        .index("fuck_log")
                        .type("fuck")
                        .source(GsonUtil.toJSONBytes(logEvent), XContentType.JSON));
                    }
                }
        );

        esSinkBuilder.setBulkFlushMaxActions(1);
        //数据一开始没索引的情况下加入下面代码
        esSinkBuilder.setRestClientFactory(new RestClientFactoryImpl());
        esSinkBuilder.setFailureHandler(new RetryRejectedExecutionFailureHandler());

        logDataStream.addSink(esSinkBuilder.build());
        env.execute("log sink");
    }
}
